Skip to content

[WIP][GLUTEN-4213] Join support pullout pre-project#10851

Draft
liujiayi771 wants to merge 4 commits intoapache:mainfrom
liujiayi771:join-pullout-2025
Draft

[WIP][GLUTEN-4213] Join support pullout pre-project#10851
liujiayi771 wants to merge 4 commits intoapache:mainfrom
liujiayi771:join-pullout-2025

Conversation

@liujiayi771
Copy link
Contributor

What changes are proposed in this pull request?

Support join pullout pre-project.

How was this patch tested?

Existing unit test cases.

@github-actions github-actions bot added CORE works for Gluten Core VELOX labels Oct 8, 2025
@liujiayi771 liujiayi771 marked this pull request as draft October 8, 2025 16:12
@apache apache deleted a comment from github-actions bot Oct 9, 2025
@apache apache deleted a comment from github-actions bot Oct 9, 2025
@github-actions
Copy link

github-actions bot commented Oct 9, 2025

Run Gluten Clickhouse CI on x86

@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer
Copy link
Member

zhztheplayer commented Oct 10, 2025

For the a test query (likely the 19th) in udf/udf-except-all.sql,

Plans generated by RAS and LEGACY are different:

RAS:

ColumnarExchange hashpartitioning(k#1276, udf(v)#1292, 4), ENSURE_REQUIREMENTS, [k#1276, udf(v)#1292, sum#1301L], [plan_id=66961], [shuffle_writer_type=hash], [output=[k#1276: int, udf(v)#1292: int, sum#1301: bigint]]
+- VeloxResizeBatches 1024, 2147483647
   +- ^(190) ProjectExecTransformer [hash(k#1276, udf(v)#1292, 42) AS hash_partition_key#1307, k#1276, udf(v)#1292, sum#1301L]
      +- ^(190) FlushableHashAggregateTransformer(keys=[k#1276, udf(v)#1292], functions=[partial_sum(vcol#1296L)], isStreamingAgg=false, ignoreNullKeys=false, output=[k#1276, udf(v)#1292, sum#1301L])
         +- ^(190) InputIteratorTransformer[vcol#1296L, k#1276, udf(v)#1292]
            +- ColumnarUnion
               :- ^(187) ProjectExecTransformer [1 AS vcol#1296L, k#1276, cast(_SparkPartialProject0#1309 as int) AS udf(v)#1292]
               :  +- ^(187) InputIteratorTransformer[k#1276, k#1282, v#1283, _SparkPartialProject0#1309]
               :     +- ColumnarPartialProject [1 AS vcol#1296L, k#1276, cast(udf(cast(v#1283 as string)) as int) AS udf(v)#1292] PartialProject List(udf(cast(v#1283 as string)) AS _SparkPartialProject0#1309)
               :        +- ^(186) ProjectExecTransformer [k#1276, k#1282, v#1283]
               :           +- ^(186) BroadcastHashJoinExecTransformer [_pre_166#1305], [k#1282], Inner, BuildLeft, false
               :              :- ^(186) InputIteratorTransformer[k#1276, _pre_166#1305]
               :              :  +- RowToVeloxColumnar
               :              :     +- *(1) Project [k#1276, cast(udf(cast(k#1276 as string)) as int) AS _pre_166#1305]
               :              :        +- VeloxColumnarToRow
               :              :           +- BroadcastQueryStage 0
               :              :              +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(cast(cast(udf(cast(input[0, int, false] as string)) as int) as bigint)),false), [plan_id=65321]
               :              :                 +- ^(184) ProjectExecTransformer [k#1278 AS k#1276]
               :              :                    +- ^(184) InputIteratorTransformer[k#1278, v#1279]
               :              :                       +- RowToVeloxColumnar
               :              :                          +- LocalTableScan [k#1278, v#1279]
               :              +- ^(186) InputIteratorTransformer[k#1282, v#1283]
               :                 +- RowToVeloxColumnar
               :                    +- LocalTableScan [k#1282, v#1283]
               +- ^(189) ProjectExecTransformer [-1 AS vcol#1297L, cast(_SparkPartialProject0#1310 as int) AS udf(k)#1293, v#1291]
                  +- ^(189) InputIteratorTransformer[k#1286, k#1290, v#1291, _SparkPartialProject0#1310]
                     +- ColumnarPartialProject [-1 AS vcol#1297L, cast(udf(cast(k#1286 as string)) as int) AS udf(k)#1293, v#1291] PartialProject List(udf(cast(k#1286 as string)) AS _SparkPartialProject0#1310)
                        +- ^(188) ProjectExecTransformer [k#1286, k#1290, v#1291]
                           +- ^(188) BroadcastHashJoinExecTransformer [k#1286], [_pre_167#1306], Inner, BuildLeft, false
                              :- ^(188) InputIteratorTransformer[k#1286]
                              :  +- BroadcastQueryStage 1
                              :     +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=65497]
                              :        +- ^(185) ProjectExecTransformer [k#1286]
                              :           +- ^(185) InputIteratorTransformer[k#1286, v#1287]
                              :              +- RowToVeloxColumnar
                              :                 +- LocalTableScan [k#1286, v#1287]
                              +- ^(188) InputIteratorTransformer[k#1290, v#1291, _pre_167#1306]
                                 +- RowToVeloxColumnar
                                    +- *(2) Project [k#1290, v#1291, cast(udf(cast(k#1290 as string)) as int) AS _pre_167#1306]
                                       +- *(2) LocalTableScan [k#1290, v#1291]

LEGACY:

ColumnarExchange hashpartitioning(k#1317, udf(v)#1333, 4), ENSURE_REQUIREMENTS, [k#1317, udf(v)#1333, sum#1342L], [plan_id=47109], [shuffle_writer_type=hash], [output=[k#1317: int, udf(v)#1333: int, sum#1342: bigint]]
+- VeloxResizeBatches 1024, 2147483647
   +- ^(186) ProjectExecTransformer [hash(k#1317, udf(v)#1333, 42) AS hash_partition_key#1349, k#1317, udf(v)#1333, sum#1342L]
      +- ^(186) FlushableHashAggregateTransformer(keys=[k#1317, udf(v)#1333], functions=[partial_sum(vcol#1337L)], isStreamingAgg=false, ignoreNullKeys=false, output=[k#1317, udf(v)#1333, sum#1342L])
         +- ^(186) InputIteratorTransformer[vcol#1337L, k#1317, udf(v)#1333]
            +- ColumnarUnion
               :- RowToVeloxColumnar
               :  +- *(1) Project [1 AS vcol#1337L, k#1317, cast(udf(cast(v#1324 as string)) as int) AS udf(v)#1333]
               :     +- *(1) BroadcastHashJoin [cast(udf(cast(k#1317 as string)) as int)], [k#1323], Inner, BuildLeft, false
               :        :- VeloxColumnarToRow
               :        :  +- BroadcastQueryStage 0
               :        :     +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(cast(cast(udf(cast(input[0, int, false] as string)) as int) as bigint)),false), [plan_id=46256]
               :        :        +- ^(184) ProjectExecTransformer [k#1319 AS k#1317]
               :        :           +- ^(184) InputIteratorTransformer[k#1319, v#1320]
               :        :              +- RowToVeloxColumnar
               :        :                 +- LocalTableScan [k#1319, v#1320]
               :        +- *(1) LocalTableScan [k#1323, v#1324]
               +- RowToVeloxColumnar
                  +- *(2) Project [-1 AS vcol#1338L, cast(udf(cast(k#1327 as string)) as int) AS udf(k)#1334, v#1332]
                     +- *(2) BroadcastHashJoin [k#1327], [cast(udf(cast(k#1331 as string)) as int)], Inner, BuildLeft, false
                        :- VeloxColumnarToRow
                        :  +- BroadcastQueryStage 1
                        :     +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=46391]
                        :        +- ^(185) ProjectExecTransformer [k#1327]
                        :           +- ^(185) InputIteratorTransformer[k#1327, v#1328]
                        :              +- RowToVeloxColumnar
                        :                 +- LocalTableScan [k#1327, v#1328]
                        +- *(2) LocalTableScan [k#1331, v#1332]

The error is:

org.apache.spark.SparkUnsupportedOperationException: WholeStageCodegen (1) does not implement doExecuteBroadcast.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.doExecuteBroadcastNotImplementedError(QueryExecutionErrors.scala:2374)
	at org.apache.spark.sql.execution.SparkPlan.doExecuteBroadcast(SparkPlan.scala:326)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:208)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:204)
	at org.apache.gluten.execution.RowToVeloxColumnarExec.doExecuteBroadcast(RowToVeloxColumnarExec.scala:76)
	at org.apache.spark.sql.execution.ColumnarInputAdapter.doExecuteBroadcast(ColumnarCollapseTransformStages.scala:207)
	at org.apache.spark.sql.execution.InputIteratorTransformer.doExecuteBroadcast(ColumnarCollapseTransformStages.scala:72)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:208)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:204)
	at org.apache.gluten.execution.BroadcastHashJoinExecTransformer.columnarInputRDDs(HashJoinExecTransformer.scala:128)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs(WholeStageTransformer.scala:127)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs$(WholeStageTransformer.scala:124)
	at org.apache.gluten.execution.ProjectExecTransformerBase.getColumnarInputRDDs(BasicPhysicalOperatorTransformer.scala:167)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs(WholeStageTransformer.scala:147)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs$(WholeStageTransformer.scala:146)
	at org.apache.gluten.execution.ProjectExecTransformerBase.columnarInputRDDs(BasicPhysicalOperatorTransformer.scala:167)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs(WholeStageTransformer.scala:127)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs$(WholeStageTransformer.scala:124)
	at org.apache.gluten.execution.WholeStageTransformer.getColumnarInputRDDs(WholeStageTransformer.scala:151)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs(WholeStageTransformer.scala:147)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs$(WholeStageTransformer.scala:146)
	at org.apache.gluten.execution.WholeStageTransformer.columnarInputRDDs(WholeStageTransformer.scala:151)
	at org.apache.gluten.execution.WholeStageTransformer.doExecuteColumnar(WholeStageTransformer.scala:430)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
	at org.apache.gluten.execution.ColumnarPartialProjectExec.doExecuteColumnar(ColumnarPartialProjectExec.scala:164)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
	at org.apache.spark.sql.execution.ColumnarInputAdapter.doExecuteColumnar(ColumnarCollapseTransformStages.scala:204)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs(WholeStageTransformer.scala:129)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs$(WholeStageTransformer.scala:124)
	at org.apache.spark.sql.execution.InputIteratorTransformer.getColumnarInputRDDs(ColumnarCollapseTransformStages.scala:48)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs(WholeStageTransformer.scala:147)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs$(WholeStageTransformer.scala:146)
	at org.apache.spark.sql.execution.InputIteratorTransformer.columnarInputRDDs(ColumnarCollapseTransformStages.scala:48)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs(WholeStageTransformer.scala:127)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs$(WholeStageTransformer.scala:124)
	at org.apache.gluten.execution.ProjectExecTransformerBase.getColumnarInputRDDs(BasicPhysicalOperatorTransformer.scala:167)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs(WholeStageTransformer.scala:147)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs$(WholeStageTransformer.scala:146)
	at org.apache.gluten.execution.ProjectExecTransformerBase.columnarInputRDDs(BasicPhysicalOperatorTransformer.scala:167)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs(WholeStageTransformer.scala:127)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs$(WholeStageTransformer.scala:124)
	at org.apache.gluten.execution.WholeStageTransformer.getColumnarInputRDDs(WholeStageTransformer.scala:151)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs(WholeStageTransformer.scala:147)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs$(WholeStageTransformer.scala:146)
	at org.apache.gluten.execution.WholeStageTransformer.columnarInputRDDs(WholeStageTransformer.scala:151)
	at org.apache.gluten.execution.WholeStageTransformer.doExecuteColumnar(WholeStageTransformer.scala:430)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
	at org.apache.gluten.execution.ColumnarUnionExec.$anonfun$columnarInputRDD$1(BasicPhysicalOperatorTransformer.scala:277)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.gluten.execution.ColumnarUnionExec.columnarInputRDD(BasicPhysicalOperatorTransformer.scala:277)
	at org.apache.gluten.execution.ColumnarUnionExec.doExecuteColumnar(BasicPhysicalOperatorTransformer.scala:285)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
	at org.apache.spark.sql.execution.ColumnarInputAdapter.doExecuteColumnar(ColumnarCollapseTransformStages.scala:204)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs(WholeStageTransformer.scala:129)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs$(WholeStageTransformer.scala:124)
	at org.apache.spark.sql.execution.InputIteratorTransformer.getColumnarInputRDDs(ColumnarCollapseTransformStages.scala:48)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs(WholeStageTransformer.scala:147)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs$(WholeStageTransformer.scala:146)
	at org.apache.spark.sql.execution.InputIteratorTransformer.columnarInputRDDs(ColumnarCollapseTransformStages.scala:48)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs(WholeStageTransformer.scala:127)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs$(WholeStageTransformer.scala:124)
	at org.apache.gluten.execution.HashAggregateExecBaseTransformer.getColumnarInputRDDs(HashAggregateExecBaseTransformer.scala:35)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs(WholeStageTransformer.scala:147)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs$(WholeStageTransformer.scala:146)
	at org.apache.gluten.execution.HashAggregateExecBaseTransformer.columnarInputRDDs(HashAggregateExecBaseTransformer.scala:35)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs(WholeStageTransformer.scala:127)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs$(WholeStageTransformer.scala:124)
	at org.apache.gluten.execution.ProjectExecTransformerBase.getColumnarInputRDDs(BasicPhysicalOperatorTransformer.scala:167)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs(WholeStageTransformer.scala:147)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs$(WholeStageTransformer.scala:146)
	at org.apache.gluten.execution.ProjectExecTransformerBase.columnarInputRDDs(BasicPhysicalOperatorTransformer.scala:167)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs(WholeStageTransformer.scala:127)
	at org.apache.gluten.execution.TransformSupport.getColumnarInputRDDs$(WholeStageTransformer.scala:124)
	at org.apache.gluten.execution.WholeStageTransformer.getColumnarInputRDDs(WholeStageTransformer.scala:151)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs(WholeStageTransformer.scala:147)
	at org.apache.gluten.execution.UnaryTransformSupport.columnarInputRDDs$(WholeStageTransformer.scala:146)
	at org.apache.gluten.execution.WholeStageTransformer.columnarInputRDDs(WholeStageTransformer.scala:151)
	at org.apache.gluten.execution.WholeStageTransformer.doExecuteColumnar(WholeStageTransformer.scala:430)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
	at org.apache.gluten.execution.ColumnarToColumnarExec.doExecuteColumnar(ColumnarToColumnarExec.scala:72)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
	at org.apache.spark.sql.execution.ColumnarShuffleExchangeExec.inputColumnarRDD$lzycompute(ColumnarShuffleExchangeExec.scala:67)
	at org.apache.spark.sql.execution.ColumnarShuffleExchangeExec.inputColumnarRDD(ColumnarShuffleExchangeExec.scala:67)
	at org.apache.spark.sql.execution.ColumnarShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ColumnarShuffleExchangeExec.scala:71)
	at org.apache.spark.sql.execution.ColumnarShuffleExchangeExec.mapOutputStatisticsFuture(ColumnarShuffleExchangeExec.scala:70)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:73)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:73)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:72)
	at org.apache.spark.sql.execution.ColumnarShuffleExchangeExec.submitShuffleJob(ColumnarShuffleExchangeExec.scala:43)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:194)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:194)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:196)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:61)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:302)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:300)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:300)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:419)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:475)
	at org.apache.spark.sql.execution.HiveResult$.hiveResultString(HiveResult.scala:76)
	at org.apache.spark.sql.SQLQueryTestHelper.$anonfun$getNormalizedQueryExecutionResult$2(SQLQueryTestHelper.scala:119)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.SQLQueryTestHelper.getNormalizedQueryExecutionResult(SQLQueryTestHelper.scala:119)
	at org.apache.spark.sql.SQLQueryTestHelper.getNormalizedQueryExecutionResult$(SQLQueryTestHelper.scala:102)
	at org.apache.spark.sql.GlutenSQLQueryTestSuite.getNormalizedQueryExecutionResult(GlutenSQLQueryTestSuite.scala:60)
	at org.apache.spark.sql.GlutenSQLQueryTestSuite.$anonfun$runQueries$9(GlutenSQLQueryTestSuite.scala:712)
	at org.apache.spark.sql.GlutenSQLQueryTestSuite.handleExceptions(GlutenSQLQueryTestSuite.scala:161)
	at org.apache.spark.sql.GlutenSQLQueryTestSuite.$anonfun$runQueries$7(GlutenSQLQueryTestSuite.scala:712)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.GlutenSQLQueryTestSuite.runQueries(GlutenSQLQueryTestSuite.scala:699)
	at org.apache.spark.sql.GlutenSQLQueryTestSuite.$anonfun$runSqlTestCase$34(GlutenSQLQueryTestSuite.scala:601)
	at org.apache.spark.sql.GlutenSQLQueryTestSuite.$anonfun$runSqlTestCase$34$adapted(GlutenSQLQueryTestSuite.scala:599)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.GlutenSQLQueryTestSuite.runSqlTestCase(GlutenSQLQueryTestSuite.scala:599)
	at org.apache.spark.sql.GlutenSQLQueryTestSuite.$anonfun$createScalaTestCase$5(GlutenSQLQueryTestSuite.scala:488)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
	at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
	at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
	at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
	at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
	at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
	at org.scalatest.Suite.run(Suite.scala:1114)
	at org.scalatest.Suite.run$(Suite.scala:1096)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)

I haven't investigated about the discrepancy here, but it seems this part of the query plan from RAS is a bit off:

+- ^(186) BroadcastHashJoinExecTransformer [_pre_166#1305], [k#1282], Inner, BuildLeft, false
         :- ^(186) InputIteratorTransformer[k#1276, _pre_166#1305]
                          +- RowToVeloxColumnar
                                 +- *(1) Project [k#1276, cast(udf(cast(k#1276 as string)) as int) AS _pre_166#1305]
                                                +- VeloxColumnarToRow
                                                     +- BroadcastQueryStage 0
         :- ...

Where the pulled Project [k#1276, cast(udf(cast(k#1276 as string)) as int) AS _pre_166#1305] doesn't implement doExecuteBroadcast so causes the error.

I guess this is a plan pattern that the LEGACY planner can also generate? Should we really pull out the project on BHJ's build side?

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Oct 11, 2025

@zhztheplayer LEGACY also has this issue. I added a rewrite fallback logic for the join. When pulled out project would have a fallback(can definitely make it more granular, focusing on the "broadcast join"), the join itself will also fallback together. Here, it seems that RAS did not restore the plan and still generated a pre-project. I can debug the reason.

However, I found that the pre-project of the join can be handled through the pull-out framework. When I tried to handle the post-project, I found it difficult to implement. The output of the join is not controlled by a resultExpression like in agg or window; instead, it is entirely based on the left and right inputs. It seems this might not be fully achievable at the moment.

@github-actions github-actions bot added the stale stale label Dec 18, 2025
@github-actions github-actions bot closed this Dec 28, 2025
@liujiayi771 liujiayi771 reopened this Mar 6, 2026
@liujiayi771 liujiayi771 force-pushed the join-pullout-2025 branch 2 times, most recently from 959cd88 to 6e96ff8 Compare March 6, 2026 16:03
@github-actions
Copy link

github-actions bot commented Mar 6, 2026

Run Gluten Clickhouse CI on x86

@apache apache deleted a comment from github-actions bot Mar 6, 2026
@apache apache deleted a comment from github-actions bot Mar 6, 2026
@apache apache deleted a comment from github-actions bot Mar 6, 2026
@apache apache deleted a comment from github-actions bot Mar 6, 2026
@apache apache deleted a comment from github-actions bot Mar 6, 2026
@apache apache deleted a comment from github-actions bot Mar 6, 2026
@apache apache deleted a comment from github-actions bot Mar 6, 2026
@github-actions
Copy link

github-actions bot commented Mar 6, 2026

Run Gluten Clickhouse CI on x86

@github-actions github-actions bot removed the stale stale label Mar 7, 2026
@github-actions
Copy link

github-actions bot commented Mar 7, 2026

Run Gluten Clickhouse CI on x86

@github-actions
Copy link

github-actions bot commented Mar 7, 2026

Run Gluten Clickhouse CI on x86

@github-actions
Copy link

github-actions bot commented Mar 7, 2026

Run Gluten Clickhouse CI on x86

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLICKHOUSE CORE works for Gluten Core VELOX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants